package com.ty.generator.renren.utils;

import com.mongodb.BasicDBObject;
import com.mongodb.MongoCommandException;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.ty.generator.renren.config.MongoManager;
import com.ty.generator.renren.entity.mongo.MongoDefinition;
import com.ty.generator.renren.entity.mongo.Type;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.Collectors;

/** @author: gxz 514190950@qq.com */
public class MongoScanner {
  private Logger logger = LoggerFactory.getLogger(getClass());

  private MongoCollection<Document> collection;

  private final int scanCount;

  private List<String> colNames;

  private MongoDefinition mongoDefinition;

  private static final int[] TYPE = {3, 16, 18, 8, 9, 2, 1};

  private static final int ARRAY_TYPE = 4;

  private static final int MAX_COUNT = 200000;

  private static final int DEFAULT_COUNT = 100000;

  public MongoScanner(MongoCollection<Document> collection) {
    this.collection = collection;
    this.scanCount = DEFAULT_COUNT;
    scan();
  }

  private void scan() {
    // 初始化
    initColNames();
    // 解析属性值
    mongoDefinition = scanType();
    MongoManager.putInfo(collection.getNamespace().getCollectionName(), mongoDefinition);
    // 解析完成之后释放链接资源
    this.collection = null;
  }

  public MongoDefinition getProduct() {
    return mongoDefinition;
  }

  /**
   * 功能描述:分组发送聚合函数(获得一级属性名)
   *
   * @author : gxz
   */
  public List<String> groupAggregation(Integer skip, Integer limit) throws MongoCommandException {
    skip = skip == null ? 0 : skip;
    limit = limit == null ? scanCount : limit;
    MongoCollection<Document> collection = this.collection;
    BasicDBObject $project =
        new BasicDBObject(
            "$project",
            new BasicDBObject("arrayofkeyvalue", new BasicDBObject("$objectToArray", "$$ROOT")));
    BasicDBObject $unwind = new BasicDBObject("$unwind", "$arrayofkeyvalue");
    BasicDBObject $skip = new BasicDBObject("$skip", skip);
    BasicDBObject $limit = new BasicDBObject("$limit", limit);
    BasicDBObject filed = new BasicDBObject("_id", "null");
    filed.append("allkeys", new BasicDBObject("$addToSet", "$arrayofkeyvalue.k"));
    BasicDBObject $group = new BasicDBObject("$group", filed);
    List<BasicDBObject> dbStages = Arrays.asList($project, $skip, $limit, $unwind, $group);
    // System.out.println(dbStages);  发送的聚合函数   获得所有参数名称
    AggregateIterable<Document> aggregate = collection.aggregate(dbStages);
    Document document = aggregate.first();
    if (document == null) {
      BasicDBObject existsQuery = new BasicDBObject("$ROOT", new BasicDBObject("$exists", true));
      MongoCursor<Document> existsList = collection.find(existsQuery).limit(100).iterator();
      Set<String> keySet = new HashSet<>();
      while (existsList.hasNext()) {
        Document next = existsList.next();
        Map<String, Object> keyMap = (Document) next.get("$ROOT");
        keySet.addAll(keyMap.keySet());
      }
      return new ArrayList<>(keySet);
    } else {
      return (List<String>) document.get("allkeys");
    }
  }

  /**
   * 如果一个文档是对象类型 获得这个属性的下一级的属性名的集合 例子: user:{name:"张三",age:12} 传入user 返回[name,age]
   *
   * @param parameterName 上层参数名 这个参数名可以包含一个或多个. 注: 参数传递之前需确认: 1.上层属性一定是对象类型
   * @return 返回这个属性内的所有属性名
   */
  public Set<String> getNextParameterNames(String parameterName) {
    Document condition = new Document(parameterName, new Document("$exists", true));
    Document match = new Document("$match", condition);
    String unwindName = parameterName;
    if (parameterName.contains(".")) {
      unwindName = parameterName.split("\\.")[0];
    }
    Document unwind = new Document("$unwind", "$" + unwindName);
    Document limit = new Document("$limit", 3000);
    Document project =
        new Document("$project", new Document("list", "$" + parameterName).append("_id", false));
    Document unwind2 = new Document("$unwind", "$list");
    AggregateIterable<Document> aggregate =
        this.collection.aggregate(Arrays.asList(match, unwind, limit, project, unwind2));
    Set<String> names = new HashSet<>();
    for (Document document : aggregate) {
      Object list = document.get("list");
      if (list instanceof Map) {
        Set<String> documentNames = ((Document) list).keySet();
        names.addAll(documentNames);
      }
    }
    logger.info("解析" + parameterName + "有" + names.size() + "个子属性");
    return names;
  }

  /**
   * 功能描述:提供属性名 解析属性类型 获取相应的属性信息 封装成generator对象
   *
   * @return : 解析之后的Model {@see #MongoDefinition}
   * @param: propertyName 属性名 可以是层级名 比如 name 也可以是info.name
   * @see MongoDefinition
   */
  public MongoDefinition processNameType(String propertyName) {
    MongoCollection<Document> collection = this.collection;
    MongoDefinition result = new MongoDefinition();
    if ("_id".equals(propertyName)) {
      result.setType(2);
      result.setPropertyName("_id");
      return result;
    }
    result.setPropertyName(propertyName);
    MongoCursor<Document> isArray =
        collection
            .find(new Document(propertyName, new Document("$type", ARRAY_TYPE)))
            .limit(1)
            .iterator();
    if (isArray.hasNext()) {
      result.setArray(true);
      for (int i : TYPE) {
        MongoCursor<Document> iterator =
            collection
                .find(new Document(propertyName, new Document("$type", i)))
                .limit(1)
                .iterator();
        if (iterator.hasNext()) {
          if (i == 3) {
            result.setChild(this.produceChildList(propertyName));
          }
          // 1是double 2是string 3是对象 4是数组 16是int 18 是long
          result.setType(i);
          logger.info("解析[" + propertyName + "]是[List][" + Type.typeInfo(result.getType()) + "]");
          return result;
        }
      }
    } else {
      for (int i : TYPE) {
        MongoCursor<Document> iterator =
            collection
                .find(new Document(propertyName, new Document("$type", i)))
                .limit(1)
                .iterator();
        if (iterator.hasNext()) {
          if (i == 3) {
            result.setChild(this.produceChildList(propertyName));
          }
          // 1是double 2是string 3是对象 4是数组 16是int 18 是long
          // 到这里就是数组了
          result.setType(i);
          logger.info("解析[" + propertyName + "]是[" + Type.typeInfo(result.getType()) + "]");
          return result;
        }
      }
      result.setType(2);
    }
    logger.info("解析[" + propertyName + "]是[" + Type.typeInfo(result.getType()) + "]");
    return result;
  }

  private List<MongoDefinition> produceChildList(String parentName) {
    Set<String> nextParameterNames = this.getNextParameterNames(parentName);
    List<String> strings = new ArrayList<>(nextParameterNames);
    List<String> collect =
        strings.stream().map(name -> parentName + "." + name).collect(Collectors.toList());
    ForkJoinPool pool = new ForkJoinPool();
    ForkJoinTask<List<MongoDefinition>> task = new ForkJoinProcessType(collect);
    return pool.invoke(task);
  }

  private List<String> distinctAndJoin(List<String> a, List<String> b) {
    a.removeAll(b);
    a.addAll(b);
    return a;
  }

  /** 功能描述:解析这个集合的列名 用ForkJoin框架实现 */
  private void initColNames() {
    long start = System.currentTimeMillis();
    int scan = this.scanCount;
    long count = this.collection.countDocuments();
    ForkJoinPool pool = new ForkJoinPool();
    ForkJoinTask<List<String>> task;
    if (count > (long) scan) {
      task = new ForkJoinGetProcessName(0, scan);
    } else {
      task = new ForkJoinGetProcessName(0, (int) count);
    }
    this.colNames = pool.invoke(task);
    logger.info(
        "collection["
            + this.collection.getNamespace().getCollectionName()
            + "]初始化列名成功.....     用时: "
            + (System.currentTimeMillis() - start)
            + "毫秒");
  }

  private MongoDefinition scanType() {
    MongoDefinition result = new MongoDefinition();
    List<String> colNames = this.colNames;
    ForkJoinPool pool = new ForkJoinPool();
    ForkJoinTask<List<MongoDefinition>> task = new ForkJoinProcessType(colNames);
    List<MongoDefinition> invoke = pool.invoke(task);
    return result
        .setChild(invoke)
        .setPropertyName(this.collection.getNamespace().getCollectionName());
  }

  /** 功能描述:forkJoin多线程框架的实现 通过业务拆分解析类型 */
  class ForkJoinProcessType extends RecursiveTask<List<MongoDefinition>> {
    List<String> names;
    private final int THRESHOLD = 6;

    ForkJoinProcessType(List<String> names) {
      this.names = names;
    }

    @Override
    protected List<MongoDefinition> compute() {
      if (names.size() <= THRESHOLD) {
        List<MongoDefinition> result = new ArrayList<>();
        for (String name : names) {
          MongoDefinition childrenDefinition = processNameType(name);
          result.add(childrenDefinition);
        }
        return result;
      } else {
        int size = names.size();
        int middle = size / 2;
        List<String> leftList = names.subList(0, middle);
        List<String> rightList = names.subList(middle, size);
        ForkJoinProcessType pre = new ForkJoinProcessType(leftList);
        pre.fork();
        ForkJoinProcessType next = new ForkJoinProcessType(rightList);
        next.fork();
        return mergeList(pre.join(), next.join());
      }
    }
  }

  /** 功能描述:forkJoin多线程框架的实现 通过业务拆分获得属性名 */
  class ForkJoinGetProcessName extends RecursiveTask<List<String>> {
    private int begin; // 查询开始位置
    private int end;
    private final int THRESHOLD = 5000;

    ForkJoinGetProcessName(int begin, int end) {
      this.begin = begin;
      this.end = end;
    }

    @Override
    protected List<String> compute() {
      int count = end - begin;
      if (THRESHOLD >= count) {
        return groupAggregation(begin, count);
      } else {
        int middle = (begin + end) / 2;
        ForkJoinGetProcessName pre = new ForkJoinGetProcessName(begin, middle);
        pre.fork();
        ForkJoinGetProcessName next = new ForkJoinGetProcessName(middle + 1, end);
        next.fork();
        return distinctAndJoin(pre.join(), next.join()); // 去重合并
      }
    }
  }

  public <T> List<T> mergeList(List<T> list1, List<T> list2) {
    list1.addAll(list2);
    return list1;
  }
}
